1 package org.apache.solr.common.cloud;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.Closeable;
21 import java.io.Serializable;
22 import java.io.UnsupportedEncodingException;
23 import java.lang.invoke.MethodHandles;
24 import java.net.URLDecoder;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.TimeUnit;
37
38 import org.apache.solr.common.Callable;
39 import org.apache.solr.common.SolrException;
40 import org.apache.solr.common.SolrException.ErrorCode;
41 import org.apache.solr.common.util.Pair;
42 import org.apache.solr.common.util.Utils;
43 import org.apache.zookeeper.CreateMode;
44 import org.apache.zookeeper.KeeperException;
45 import org.apache.zookeeper.WatchedEvent;
46 import org.apache.zookeeper.Watcher;
47 import org.apache.zookeeper.Watcher.Event.EventType;
48 import org.apache.zookeeper.data.Stat;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import static java.util.Arrays.asList;
53 import static java.util.Collections.EMPTY_MAP;
54 import static java.util.Collections.emptyMap;
55 import static java.util.Collections.emptySet;
56 import static java.util.Collections.unmodifiableSet;
57 import static org.apache.solr.common.util.Utils.fromJSON;
58
59 public class ZkStateReader implements Closeable {
60 private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
61
62 public static final String BASE_URL_PROP = "base_url";
63 public static final String NODE_NAME_PROP = "node_name";
64 public static final String CORE_NODE_NAME_PROP = "core_node_name";
65 public static final String ROLES_PROP = "roles";
66 public static final String STATE_PROP = "state";
67 public static final String CORE_NAME_PROP = "core";
68 public static final String COLLECTION_PROP = "collection";
69 public static final String ELECTION_NODE_PROP = "election_node";
70 public static final String SHARD_ID_PROP = "shard";
71 public static final String REPLICA_PROP = "replica";
72 public static final String SHARD_RANGE_PROP = "shard_range";
73 public static final String SHARD_STATE_PROP = "shard_state";
74 public static final String SHARD_PARENT_PROP = "shard_parent";
75 public static final String NUM_SHARDS_PROP = "numShards";
76 public static final String LEADER_PROP = "leader";
77 public static final String PROPERTY_PROP = "property";
78 public static final String PROPERTY_VALUE_PROP = "property.value";
79 public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
80 public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds";
81 public static final String COLLECTIONS_ZKNODE = "/collections";
82 public static final String LIVE_NODES_ZKNODE = "/live_nodes";
83 public static final String ALIASES = "/aliases.json";
84 public static final String CLUSTER_STATE = "/clusterstate.json";
85 public static final String CLUSTER_PROPS = "/clusterprops.json";
86 public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
87 public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
88
89 public static final String REPLICATION_FACTOR = "replicationFactor";
90 public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
91 public static final String AUTO_ADD_REPLICAS = "autoAddReplicas";
92
93 public static final String ROLES = "/roles.json";
94
95 public static final String CONFIGS_ZKNODE = "/configs";
96 public final static String CONFIGNAME_PROP="configName";
97
98 public static final String LEGACY_CLOUD = "legacyCloud";
99
100 public static final String URL_SCHEME = "urlScheme";
101
102
103 protected volatile ClusterState clusterState;
104
105 private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
106 private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = 4000;
107
108 public static final String LEADER_ELECT_ZKNODE = "leader_elect";
109
110 public static final String SHARD_LEADERS_ZKNODE = "leaders";
111 public static final String ELECTION_NODE = "election";
112
113
114 private final Set<String> interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
115
116
117 private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();
118
119 private int legacyClusterStateVersion = 0;
120
121
122 private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
123
124
125 private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<String, LazyCollectionRef>();
126
127 private volatile Set<String> liveNodes = emptySet();
128
129 private final ZkConfigManager configManager;
130
131 private ConfigData securityData;
132
133 private final Runnable securityNodeListener;
134
135 public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
136 LEGACY_CLOUD,
137 URL_SCHEME,
138 AUTO_ADD_REPLICAS)));
139
140
141
142
143
144
145 public String readConfigName(String collection) {
146
147 String configName = null;
148
149 String path = COLLECTIONS_ZKNODE + "/" + collection;
150 if (log.isInfoEnabled()) {
151 log.info("Load collection config from:" + path);
152 }
153
154 try {
155 byte[] data = zkClient.getData(path, null, null, true);
156
157 if(data != null) {
158 ZkNodeProps props = ZkNodeProps.load(data);
159 configName = props.getStr(CONFIGNAME_PROP);
160 }
161
162 if (configName != null) {
163 if (!zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
164 log.error("Specified config does not exist in ZooKeeper:" + configName);
165 throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
166 "Specified config does not exist in ZooKeeper:" + configName);
167 } else if (log.isInfoEnabled()) {
168 log.info("path={} {}={} specified config exists in ZooKeeper",
169 new Object[] {path, CONFIGNAME_PROP, configName});
170 }
171 } else {
172 throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
173 }
174 }
175 catch (KeeperException e) {
176 throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading config name for collection " + collection, e);
177 }
178 catch (InterruptedException e) {
179 Thread.interrupted();
180 throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading config name for collection " + collection, e);
181 }
182
183 return configName;
184 }
185
186
187 private static class ZKTF implements ThreadFactory {
188 private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
189 @Override
190 public Thread newThread(Runnable r) {
191 Thread td = new Thread(tg, r);
192 td.setDaemon(true);
193 return td;
194 }
195 }
196
197 private final SolrZkClient zkClient;
198
199 private final boolean closeClient;
200
201 private volatile Aliases aliases = new Aliases();
202
203 private volatile boolean closed = false;
204
205 public ZkStateReader(SolrZkClient zkClient) {
206 this(zkClient, null);
207 }
208
209 public ZkStateReader(SolrZkClient zkClient, Runnable securityNodeListener) {
210 this.zkClient = zkClient;
211 this.configManager = new ZkConfigManager(zkClient);
212 this.closeClient = false;
213 this.securityNodeListener = securityNodeListener;
214 }
215
216
217 public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) {
218 this.zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
219
220 new OnReconnect() {
221 @Override
222 public void command() {
223 try {
224 ZkStateReader.this.createClusterStateWatchersAndUpdate();
225 } catch (KeeperException e) {
226 log.error("", e);
227 throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
228 "", e);
229 } catch (InterruptedException e) {
230
231 Thread.currentThread().interrupt();
232 log.error("", e);
233 throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
234 "", e);
235 }
236 }
237 });
238 this.configManager = new ZkConfigManager(zkClient);
239 this.closeClient = true;
240 this.securityNodeListener = null;
241 }
242
243 public ZkConfigManager getConfigManager() {
244 return configManager;
245 }
246
247
248
249
250 public void updateClusterState() throws KeeperException, InterruptedException {
251 synchronized (getUpdateLock()) {
252 if (clusterState == null) {
253
254 createClusterStateWatchersAndUpdate();
255 return;
256 }
257
258 refreshLegacyClusterState(null);
259
260 Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
261 for (String coll : safeCopy) {
262 DocCollection newState = fetchCollectionState(coll, null);
263 updateWatchedCollection(coll, newState);
264 }
265 refreshCollectionList(null);
266 refreshLiveNodes(null);
267 constructState();
268 }
269 }
270
271
272 public void updateLiveNodes() throws KeeperException, InterruptedException {
273 refreshLiveNodes(null);
274 }
275
276 public Aliases getAliases() {
277 return aliases;
278 }
279
280 public Integer compareStateVersions(String coll, int version) {
281 DocCollection collection = clusterState.getCollectionOrNull(coll);
282 if (collection == null) return null;
283 if (collection.getZNodeVersion() < version) {
284 log.debug("server older than client {}<{}", collection.getZNodeVersion(), version);
285 DocCollection nu = getCollectionLive(this, coll);
286 if (nu == null) return -1 ;
287 if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
288 updateWatchedCollection(coll, nu);
289 collection = nu;
290 }
291 }
292
293 if (collection.getZNodeVersion() == version) {
294 return null;
295 }
296
297 log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion());
298
299 return collection.getZNodeVersion();
300 }
301
302 public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
303 InterruptedException {
304
305
306 log.info("Updating cluster state from ZooKeeper... ");
307
308
309 if (!zkClient.exists(CLUSTER_STATE, true)) {
310 throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
311 "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
312 }
313
314
315 refreshLegacyClusterState(new LegacyClusterStateWatcher());
316 refreshStateFormat2Collections();
317 refreshCollectionList(new CollectionsChildWatcher());
318 refreshLiveNodes(new LiveNodeWatcher());
319
320 synchronized (ZkStateReader.this.getUpdateLock()) {
321 constructState();
322
323 zkClient.exists(ALIASES,
324 new Watcher() {
325
326 @Override
327 public void process(WatchedEvent event) {
328
329
330 if (EventType.None.equals(event.getType())) {
331 return;
332 }
333 try {
334 synchronized (ZkStateReader.this.getUpdateLock()) {
335 log.info("Updating aliases... ");
336
337
338 final Watcher thisWatch = this;
339 Stat stat = new Stat();
340 byte[] data = zkClient.getData(ALIASES, thisWatch, stat ,
341 true);
342
343 Aliases aliases = ClusterState.load(data);
344
345 ZkStateReader.this.aliases = aliases;
346 }
347 } catch (KeeperException e) {
348 if (e.code() == KeeperException.Code.SESSIONEXPIRED
349 || e.code() == KeeperException.Code.CONNECTIONLOSS) {
350 log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
351 return;
352 }
353 log.error("", e);
354 throw new ZooKeeperException(
355 SolrException.ErrorCode.SERVER_ERROR, "", e);
356 } catch (InterruptedException e) {
357
358 Thread.currentThread().interrupt();
359 log.warn("", e);
360 return;
361 }
362 }
363
364 }, true);
365 }
366 updateAliases();
367
368 if (securityNodeListener != null) {
369 addSecuritynodeWatcher(SOLR_SECURITY_CONF_PATH, new Callable<Pair<byte[], Stat>>() {
370 @Override
371 public void call(Pair<byte[], Stat> pair) {
372 ConfigData cd = new ConfigData();
373 cd.data = pair.getKey() == null || pair.getKey().length == 0 ? EMPTY_MAP : Utils.getDeepCopy((Map) fromJSON(pair.getKey()), 4, false);
374 cd.version = pair.getValue() == null ? -1 : pair.getValue().getVersion();
375 securityData = cd;
376 securityNodeListener.run();
377 }
378 });
379 securityData = getSecurityProps(true);
380 }
381 }
382
383 private void addSecuritynodeWatcher(final String path, final Callable<Pair<byte[], Stat>> callback)
384 throws KeeperException, InterruptedException {
385 zkClient.exists(SOLR_SECURITY_CONF_PATH,
386 new Watcher() {
387
388 @Override
389 public void process(WatchedEvent event) {
390
391
392 if (EventType.None.equals(event.getType())) {
393 return;
394 }
395 try {
396 synchronized (ZkStateReader.this.getUpdateLock()) {
397 log.info("Updating {} ... ", path);
398
399
400 final Watcher thisWatch = this;
401 Stat stat = new Stat();
402 byte[] data = getZkClient().getData(path, thisWatch, stat, true);
403 try {
404 callback.call(new Pair<>(data, stat));
405 } catch (Exception e) {
406 if (e instanceof KeeperException) throw (KeeperException) e;
407 if (e instanceof InterruptedException) throw (InterruptedException) e;
408 log.error("Error running collections node listener", e);
409 }
410 }
411 } catch (KeeperException e) {
412 if (e.code() == KeeperException.Code.SESSIONEXPIRED
413 || e.code() == KeeperException.Code.CONNECTIONLOSS) {
414 log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
415 return;
416 }
417 log.error("", e);
418 throw new ZooKeeperException(
419 ErrorCode.SERVER_ERROR, "", e);
420 } catch (InterruptedException e) {
421
422 Thread.currentThread().interrupt();
423 log.warn("", e);
424 return;
425 }
426 }
427
428 }, true);
429 }
430
431
432
433
434
435 private void constructState() {
436
437
438 Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);
439
440
441 for (String coll : interestingCollections) {
442 if (!result.containsKey(coll) && !watchedCollectionStates.containsKey(coll)) {
443 new StateWatcher(coll).refreshAndWatch(true);
444 }
445 }
446
447
448 for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
449 if (!result.containsKey(entry.getKey())) {
450 result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
451 }
452 }
453
454
455 for (Map.Entry<String, LazyCollectionRef> entry : lazyCollectionStates.entrySet()) {
456 if (!result.containsKey(entry.getKey())) {
457 result.put(entry.getKey(), entry.getValue());
458 }
459 }
460
461 this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
462 log.debug("clusterStateSet: version {} legacy {} interesting {} watched {} lazy {} total {}",
463 clusterState.getZkClusterStateVersion(),
464 legacyCollectionStates.keySet(),
465 interestingCollections,
466 watchedCollectionStates.keySet(),
467 lazyCollectionStates.keySet(),
468 clusterState.getCollections());
469 }
470
471
472
473
474 private void refreshLegacyClusterState(Watcher watcher)
475 throws KeeperException, InterruptedException {
476 try {
477 Stat stat = new Stat();
478 byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
479 ClusterState loadedData = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), CLUSTER_STATE);
480 synchronized (getUpdateLock()) {
481 this.legacyCollectionStates = loadedData.getCollectionStates();
482 this.legacyClusterStateVersion = stat.getVersion();
483 }
484 } catch (KeeperException.NoNodeException e) {
485
486 synchronized (getUpdateLock()) {
487 this.legacyCollectionStates = emptyMap();
488 this.legacyClusterStateVersion = 0;
489 }
490 }
491 }
492
493
494
495
496 private void refreshStateFormat2Collections() {
497
498 for (String coll : interestingCollections) {
499 new StateWatcher(coll).refreshAndWatch(watchedCollectionStates.containsKey(coll));
500 }
501 }
502
503
504
505
506
507
508
509
510
511
512
513
514
515 private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
516 List<String> children = null;
517 try {
518 children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
519 } catch (KeeperException.NoNodeException e) {
520 log.warn("Error fetching collection names");
521
522 }
523 if (children == null || children.isEmpty()) {
524 lazyCollectionStates.clear();
525 return;
526 }
527
528
529
530
531 this.lazyCollectionStates.keySet().retainAll(children);
532 for (String coll : children) {
533
534 if (!interestingCollections.contains(coll)) {
535
536 LazyCollectionRef existing = lazyCollectionStates.get(coll);
537 if (existing == null) {
538 lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
539 }
540 }
541 }
542 }
543
544 private class LazyCollectionRef extends ClusterState.CollectionRef {
545
546 private final String collName;
547
548 public LazyCollectionRef(String collName) {
549 super(null);
550 this.collName = collName;
551 }
552
553 @Override
554 public DocCollection get() {
555
556 return getCollectionLive(ZkStateReader.this, collName);
557 }
558
559 @Override
560 public boolean isLazilyLoaded() {
561 return true;
562 }
563
564 @Override
565 public String toString() {
566 return "LazyCollectionRef(" + collName + ")";
567 }
568 }
569
570
571
572
573 private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
574 Set<String> newLiveNodes;
575 try {
576 List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
577 log.debug("Updating live nodes from ZooKeeper... ({})", nodeList.size());
578 newLiveNodes = new HashSet<>(nodeList);
579 } catch (KeeperException.NoNodeException e) {
580 newLiveNodes = emptySet();
581 }
582 synchronized (getUpdateLock()) {
583 this.liveNodes = newLiveNodes;
584 if (clusterState != null) {
585 clusterState.setLiveNodes(newLiveNodes);
586 }
587 }
588 }
589
590
591
592
593 public ClusterState getClusterState() {
594 return clusterState;
595 }
596
597 public Object getUpdateLock() {
598 return this;
599 }
600
601 public void close() {
602 this.closed = true;
603 if (closeClient) {
604 zkClient.close();
605 }
606 }
607
608 public String getLeaderUrl(String collection, String shard, int timeout)
609 throws InterruptedException, KeeperException {
610 ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection,
611 shard, timeout));
612 return props.getCoreUrl();
613 }
614
615 public Replica getLeader(String collection, String shard) throws InterruptedException {
616 if (clusterState != null) {
617 Replica replica = clusterState.getLeader(collection, shard);
618 if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) {
619 return replica;
620 }
621 }
622 return null;
623 }
624
625
626
627
628 public Replica getLeaderRetry(String collection, String shard) throws InterruptedException {
629 return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
630 }
631
632
633
634
635 public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
636 long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
637 while (true) {
638 Replica leader = getLeader(collection, shard);
639 if (leader != null) return leader;
640 if (System.nanoTime() >= timeoutAt || closed) break;
641 Thread.sleep(GET_LEADER_RETRY_INTERVAL_MS);
642 }
643 throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
644 + timeout + "ms " + ", collection: " + collection + " slice: " + shard);
645 }
646
647
648
649
650 public static String getShardLeadersPath(String collection, String shardId) {
651 return COLLECTIONS_ZKNODE + "/" + collection + "/"
652 + SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId)
653 : "") + "/leader";
654 }
655
656
657
658
659 public static String getShardLeadersElectPath(String collection, String shardId) {
660 return COLLECTIONS_ZKNODE + "/" + collection + "/"
661 + LEADER_ELECT_ZKNODE + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE)
662 : "");
663 }
664
665
666 public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName) {
667 return getReplicaProps(collection, shardId, thisCoreNodeName, null);
668 }
669
670 public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
671 Replica.State mustMatchStateFilter) {
672 return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null);
673 }
674
675 public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
676 Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) {
677 assert thisCoreNodeName != null;
678 ClusterState clusterState = this.clusterState;
679 if (clusterState == null) {
680 return null;
681 }
682 Map<String,Slice> slices = clusterState.getSlicesMap(collection);
683 if (slices == null) {
684 throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
685 "Could not find collection in zk: " + collection + " "
686 + clusterState.getCollections());
687 }
688
689 Slice replicas = slices.get(shardId);
690 if (replicas == null) {
691 throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId);
692 }
693
694 Map<String,Replica> shardMap = replicas.getReplicasMap();
695 List<ZkCoreNodeProps> nodes = new ArrayList<>(shardMap.size());
696 for (Entry<String,Replica> entry : shardMap.entrySet()) {
697 ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
698
699 String coreNodeName = entry.getValue().getName();
700
701 if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) {
702 if (mustMatchStateFilter == null || mustMatchStateFilter == Replica.State.getState(nodeProps.getState())) {
703 if (mustNotMatchStateFilter == null || mustNotMatchStateFilter != Replica.State.getState(nodeProps.getState())) {
704 nodes.add(nodeProps);
705 }
706 }
707 }
708 }
709 if (nodes.size() == 0) {
710
711 return null;
712 }
713
714 return nodes;
715 }
716
717 public SolrZkClient getZkClient() {
718 return zkClient;
719 }
720
721 public void updateAliases() throws KeeperException, InterruptedException {
722 byte[] data = zkClient.getData(ALIASES, null, null, true);
723
724 Aliases aliases = ClusterState.load(data);
725
726 ZkStateReader.this.aliases = aliases;
727 }
728 public Map getClusterProps(){
729 Map result = null;
730 try {
731 if(getZkClient().exists(ZkStateReader.CLUSTER_PROPS, true)){
732 result = (Map) Utils.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ;
733 } else {
734 result= new LinkedHashMap();
735 }
736 return result;
737 } catch (Exception e) {
738 throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading cluster properties",e) ;
739 }
740 }
741
742
743
744
745
746
747
748 public void setClusterProperty(String propertyName, String propertyValue) {
749 if (!KNOWN_CLUSTER_PROPS.contains(propertyName)) {
750 throw new SolrException(ErrorCode.BAD_REQUEST, "Not a known cluster property " + propertyName);
751 }
752
753 for (; ; ) {
754 Stat s = new Stat();
755 try {
756 if (getZkClient().exists(CLUSTER_PROPS, true)) {
757 int v = 0;
758 Map properties = (Map) Utils.fromJSON(getZkClient().getData(CLUSTER_PROPS, null, s, true));
759 if (propertyValue == null) {
760
761 if (properties.get(propertyName) != null) {
762 properties.remove(propertyName);
763 getZkClient().setData(CLUSTER_PROPS, Utils.toJSON(properties), s.getVersion(), true);
764 }
765 } else {
766
767 if (!propertyValue.equals(properties.get(propertyName))) {
768 properties.put(propertyName, propertyValue);
769 getZkClient().setData(CLUSTER_PROPS, Utils.toJSON(properties), s.getVersion(), true);
770 }
771 }
772 } else {
773 Map properties = new LinkedHashMap();
774 properties.put(propertyName, propertyValue);
775 getZkClient().create(CLUSTER_PROPS, Utils.toJSON(properties), CreateMode.PERSISTENT, true);
776 }
777 } catch (KeeperException.BadVersionException bve) {
778 log.warn("Race condition while trying to set a new cluster prop on current version " + s.getVersion());
779
780 continue;
781 } catch (KeeperException.NodeExistsException nee) {
782 log.warn("Race condition while trying to set a new cluster prop on current version " + s.getVersion());
783
784 continue;
785 } catch (Exception ex) {
786 log.error("Error updating path " + CLUSTER_PROPS, ex);
787 throw new SolrException(ErrorCode.SERVER_ERROR, "Error updating cluster property " + propertyName, ex);
788 }
789 break;
790 }
791 }
792
793
794
795
796
797
798
799 public ConfigData getSecurityProps(boolean getFresh) {
800 if (!getFresh) {
801 if (securityData == null) return new ConfigData(EMPTY_MAP, -1);
802 return new ConfigData(securityData.data, securityData.version);
803 }
804 try {
805 Stat stat = new Stat();
806 if(getZkClient().exists(SOLR_SECURITY_CONF_PATH, true)) {
807 byte[] data = getZkClient()
808 .getData(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, stat, true);
809 return data != null && data.length > 0 ?
810 new ConfigData((Map<String, Object>) Utils.fromJSON(data), stat.getVersion()) :
811 null;
812 }
813 } catch (KeeperException | InterruptedException e) {
814 throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading security properties",e) ;
815 }
816 return null;
817 }
818
819
820
821
822
823
824 public String getBaseUrlForNodeName(final String nodeName) {
825 final int _offset = nodeName.indexOf("_");
826 if (_offset < 0) {
827 throw new IllegalArgumentException("nodeName does not contain expected '_' seperator: " + nodeName);
828 }
829 final String hostAndPort = nodeName.substring(0,_offset);
830 try {
831 final String path = URLDecoder.decode(nodeName.substring(1+_offset), "UTF-8");
832 String urlScheme = (String) getClusterProps().get(URL_SCHEME);
833 if(urlScheme == null) {
834 urlScheme = "http";
835 }
836 return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path));
837 } catch (UnsupportedEncodingException e) {
838 throw new IllegalStateException("JVM Does not seem to support UTF-8", e);
839 }
840 }
841
842
843 class StateWatcher implements Watcher {
844 private final String coll;
845
846 StateWatcher(String coll) {
847 this.coll = coll;
848 }
849
850 @Override
851 public void process(WatchedEvent event) {
852 if (!interestingCollections.contains(coll)) {
853
854 log.info("Uninteresting collection {}", coll);
855 return;
856 }
857
858
859
860 if (EventType.None.equals(event.getType())) {
861 return;
862 }
863
864 log.info("A cluster state change: {} for collection {} has occurred - updating... (live nodes size: {})",
865 (event), coll, ZkStateReader.this.clusterState == null ? 0
866 : ZkStateReader.this.clusterState.getLiveNodes().size());
867
868 refreshAndWatch(true);
869 synchronized (getUpdateLock()) {
870 constructState();
871 }
872 }
873
874
875
876
877
878
879
880
881 public void refreshAndWatch(boolean expectExists) {
882 try {
883 DocCollection newState = fetchCollectionState(coll, this);
884 updateWatchedCollection(coll, newState);
885 } catch (KeeperException.NoNodeException e) {
886 if (expectExists) {
887 log.warn("State node vanished for collection: " + coll, e);
888 }
889 } catch (KeeperException e) {
890 if (e.code() == KeeperException.Code.SESSIONEXPIRED
891 || e.code() == KeeperException.Code.CONNECTIONLOSS) {
892 log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
893 return;
894 }
895 log.error("Unwatched collection: " + coll, e);
896 throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
897
898 } catch (InterruptedException e) {
899 Thread.currentThread().interrupt();
900 log.error("Unwatched collection :" + coll, e);
901 }
902 }
903 }
904
905
906 class LegacyClusterStateWatcher implements Watcher {
907
908 @Override
909 public void process(WatchedEvent event) {
910
911
912 if (EventType.None.equals(event.getType())) {
913 return;
914 }
915 log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
916 refreshAndWatch();
917 synchronized (getUpdateLock()) {
918 constructState();
919 }
920 }
921
922
923 public void refreshAndWatch() {
924 try {
925 refreshLegacyClusterState(this);
926 } catch (KeeperException.NoNodeException e) {
927 throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
928 "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
929 } catch (KeeperException e) {
930 if (e.code() == KeeperException.Code.SESSIONEXPIRED
931 || e.code() == KeeperException.Code.CONNECTIONLOSS) {
932 log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
933 return;
934 }
935 log.error("", e);
936 throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
937 "", e);
938 } catch (InterruptedException e) {
939
940 Thread.currentThread().interrupt();
941 log.warn("", e);
942 }
943 }
944 }
945
946
947 class CollectionsChildWatcher implements Watcher {
948
949 @Override
950 public void process(WatchedEvent event) {
951
952
953 if (EventType.None.equals(event.getType())) {
954 return;
955 }
956 log.info("A collections change: {}, has occurred - updating...", (event));
957 refreshAndWatch();
958 synchronized (getUpdateLock()) {
959 constructState();
960 }
961 }
962
963
964 public void refreshAndWatch() {
965 try {
966 refreshCollectionList(this);
967 } catch (KeeperException e) {
968 if (e.code() == KeeperException.Code.SESSIONEXPIRED
969 || e.code() == KeeperException.Code.CONNECTIONLOSS) {
970 log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
971 return;
972 }
973 log.error("", e);
974 throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
975 "", e);
976 } catch (InterruptedException e) {
977
978 Thread.currentThread().interrupt();
979 log.warn("", e);
980 }
981 }
982 }
983
984
985 class LiveNodeWatcher implements Watcher {
986
987 @Override
988 public void process(WatchedEvent event) {
989
990
991 if (EventType.None.equals(event.getType())) {
992 return;
993 }
994 log.info("A live node change: {}, has occurred - updating... (live nodes size: {})", (event), liveNodes.size());
995 refreshAndWatch();
996 }
997
998 public void refreshAndWatch() {
999 try {
1000 refreshLiveNodes(this);
1001 } catch (KeeperException e) {
1002 if (e.code() == KeeperException.Code.SESSIONEXPIRED
1003 || e.code() == KeeperException.Code.CONNECTIONLOSS) {
1004 log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
1005 return;
1006 }
1007 log.error("", e);
1008 throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
1009 "", e);
1010 } catch (InterruptedException e) {
1011
1012 Thread.currentThread().interrupt();
1013 log.warn("", e);
1014 }
1015 }
1016 }
1017
1018 public static DocCollection getCollectionLive(ZkStateReader zkStateReader,
1019 String coll) {
1020 try {
1021 return zkStateReader.fetchCollectionState(coll, null);
1022 } catch (KeeperException e) {
1023 throw new SolrException(ErrorCode.BAD_REQUEST,
1024 "Could not load collection from ZK:" + coll, e);
1025 } catch (InterruptedException e) {
1026 Thread.currentThread().interrupt();
1027 throw new SolrException(ErrorCode.BAD_REQUEST,
1028 "Could not load collection from ZK:" + coll, e);
1029 }
1030 }
1031
1032 private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
1033 String collectionPath = getCollectionPath(coll);
1034 try {
1035 Stat stat = new Stat();
1036 byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
1037 ClusterState state = ClusterState.load(stat.getVersion(), data,
1038 Collections.<String>emptySet(), collectionPath);
1039 ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
1040 return collectionRef == null ? null : collectionRef.get();
1041 } catch (KeeperException.NoNodeException e) {
1042 return null;
1043 }
1044 }
1045
1046 public static String getCollectionPath(String coll) {
1047 return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
1048 }
1049
1050 public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
1051 if (interestingCollections.add(coll)) {
1052 log.info("addZkWatch {}", coll);
1053 new StateWatcher(coll).refreshAndWatch(false);
1054 synchronized (getUpdateLock()) {
1055 constructState();
1056 }
1057 }
1058 }
1059
1060 private void updateWatchedCollection(String coll, DocCollection newState) {
1061 if (newState == null) {
1062 log.info("Deleting data for {}", coll);
1063 watchedCollectionStates.remove(coll);
1064 return;
1065 }
1066
1067
1068 while (true) {
1069 if (!interestingCollections.contains(coll)) {
1070 break;
1071 }
1072 DocCollection oldState = watchedCollectionStates.get(coll);
1073 if (oldState == null) {
1074 if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
1075 log.info("Add data for {} ver {} ", coll, newState.getZNodeVersion());
1076 break;
1077 }
1078 } else {
1079 if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
1080
1081 break;
1082 }
1083 if (watchedCollectionStates.replace(coll, oldState, newState)) {
1084 log.info("Updating data for {} from {} to {} ", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
1085 break;
1086 }
1087 }
1088 }
1089
1090
1091 if (!interestingCollections.contains(coll)) {
1092 watchedCollectionStates.remove(coll);
1093 log.info("Removing uninteresting collection {}", coll);
1094 }
1095 }
1096
1097
1098 public void removeZKWatch(String coll) {
1099 log.info("Removing watch for uninteresting collection {}", coll);
1100 interestingCollections.remove(coll);
1101 watchedCollectionStates.remove(coll);
1102 lazyCollectionStates.put(coll, new LazyCollectionRef(coll));
1103 synchronized (getUpdateLock()) {
1104 constructState();
1105 }
1106 }
1107
1108 public static class ConfigData {
1109 public Map<String, Object> data;
1110 public int version;
1111
1112 public ConfigData() {
1113 }
1114
1115 public ConfigData(Map<String, Object> data, int version) {
1116 this.data = data;
1117 this.version = version;
1118
1119 }
1120 }
1121 }